{
	"cells": [
		{
			"cell_type": "markdown",
			"metadata": {
				"editable": true,
				"trusted": true
			},
			"source": [
				"# Load data incrementally from Apache Hudi dataset to Amazon Redshift\n"
			]
		},
		{
			"cell_type": "markdown",
			"metadata": {
				"editable": true,
				"trusted": true
			},
			"source": [
				"####  Run this cell to set up and start your interactive session.\n"
			]
		},
		{
			"cell_type": "code",
			"execution_count": null,
			"metadata": {
				"editable": true,
				"trusted": true
			},
			"outputs": [],
			"source": [
				"%session_id_prefix hudi-redshift-incremental-\n",
				"%idle_timeout 2880\n",
				"%glue_version 4.0\n",
				"%worker_type G.1X\n",
				"%number_of_workers 5\n",
				"%connections redshift\n",
				"%%configure\n",
				"{\n",
				"    \"--datalake-formats\": \"hudi\"\n",
				"}"
			]
		},
		{
			"cell_type": "markdown",
			"metadata": {},
			"source": [
				"#### Configure your resources"
			]
		},
		{
			"cell_type": "code",
			"execution_count": null,
			"metadata": {
				"tags": [],
				"trusted": true
			},
			"outputs": [],
			"source": [
				"AWS_ACCOUNT_ID = \"123456789101\"\n",
				"REGION = \"us-east-1\"\n",
				"\n",
				"HUDI_DATASET_PATH = \"s3://<Your S3 bucket>/<Your S3 prefix>/hudi_incremental/ghcn/\"\n",
				"\n",
				"REDSHIFT_CONNECTION_NAME = \"redshift\"\n",
				"REDSHIFT_IAM_ROLE_ARN = \"arn:aws:iam::123456789101:role/RedshiftSpectrumRole\"\n",
				"REDSHIFT_SCHEMA = \"public\"\n",
				"REDSHIFT_TABLE_NAME = \"ghcn\"\n",
				"REDSHIFT_TABLE_PRIMARY_KEYS = [\"ID\", \"ELEMENT\"]"
			]
		},
		{
			"cell_type": "markdown",
			"metadata": {},
			"source": [
				"#### Initialize SparkSession and GlueContext"
			]
		},
		{
			"cell_type": "code",
			"execution_count": null,
			"metadata": {
				"tags": [],
				"trusted": true
			},
			"outputs": [],
			"source": [
				"import sys\n",
				"from datetime import datetime\n",
				"import boto3\n",
				"from botocore.exceptions import ClientError\n",
				"from awsglue.transforms import *\n",
				"from awsglue.utils import getResolvedOptions\n",
				"from pyspark.context import SparkContext\n",
				"from awsglue.context import GlueContext\n",
				"from awsglue.job import Job\n",
				"\n",
				"sc = SparkContext.getOrCreate()\n",
				"glueContext = GlueContext(sc)\n",
				"spark = glueContext.spark_session\n",
				"\n",
				"params = []\n",
				"if '--JOB_NAME' in sys.argv:\n",
				"    params.append('JOB_NAME')\n",
				"if '--TempDir' in sys.argv:\n",
				"    params.append('TempDir')\n",
				"args = getResolvedOptions(sys.argv, params)\n",
				"\n",
				"job_name = None\n",
				"if 'JOB_NAME' in args:\n",
				"    job_name = args['JOB_NAME']\n",
				"if not job_name:\n",
				"    job_name = \"hudi-ghcn-incremental-load-notebook\"\n",
				"\n",
				"if 'TempDir' in args:\n",
				"    temp_dir = args['TempDir']\n",
				"if not temp_dir:\n",
				"    temp_dir = f\"s3://aws-glue-assets-{AWS_ACCOUNT_ID}-{REGION}/temporary/\"\n",
				"\n",
				"jdbc_conf = glueContext.extract_jdbc_conf(connection_name=REDSHIFT_CONNECTION_NAME)\n"
			]
		},
		{
			"cell_type": "markdown",
			"metadata": {},
			"source": [
				"#### Determine target time range for incremental query"
			]
		},
		{
			"cell_type": "code",
			"execution_count": null,
			"metadata": {
				"tags": [],
				"trusted": true
			},
			"outputs": [],
			"source": [
				"glue = boto3.client('glue')\n",
				"\n",
				"try:\n",
				"    res = glue.get_tags(ResourceArn=f\"arn:aws:glue:{REGION}:{AWS_ACCOUNT_ID}:job/{job_name}\")\n",
				"    if 'Tags' in res and 'lastQueryEndTime' in res['Tags']:\n",
				"        beginTime = res['Tags']['lastQueryEndTime']\n",
				"    else:\n",
				"        beginTime = \"000\" ### retrieve all\n",
				"except Exception as e:\n",
				"    raise Exception(\"Failed to retrieve lastQueryEndTime tag via get_tags: \" + e.__str__())\n",
				"\n",
				"endTime = datetime.now().strftime(\"%Y%m%d%H%M%S\")\n",
				"\n",
				"print(f\"beginTime: {beginTime}\")\n",
				"print(f\"endTime: {endTime}\")"
			]
		},
		{
			"cell_type": "markdown",
			"metadata": {},
			"source": [
				"#### Run query"
			]
		},
		{
			"cell_type": "code",
			"execution_count": null,
			"metadata": {
				"tags": [],
				"trusted": true
			},
			"outputs": [],
			"source": [
				"df = spark.read.format(\"hudi\") \\\n",
				"    .option(\"hoodie.datasource.query.type\", \"incremental\") \\\n",
				"    .option(\"hoodie.datasource.read.begin.instanttime\", beginTime) \\\n",
				"    .option(\"hoodie.datasource.read.end.instanttime\", endTime) \\\n",
				"    .load(HUDI_DATASET_PATH)"
			]
		},
		{
			"cell_type": "code",
			"execution_count": null,
			"metadata": {
				"tags": [],
				"trusted": true
			},
			"outputs": [],
			"source": [
				"df.show()"
			]
		},
		{
			"cell_type": "markdown",
			"metadata": {},
			"source": [
				"#### Merge changes into destination table"
			]
		},
		{
			"cell_type": "code",
			"execution_count": null,
			"metadata": {
				"tags": [],
				"trusted": true
			},
			"outputs": [],
			"source": [
				"column_names = [f.name for f in df.schema.fields]\n",
				"print(column_names)"
			]
		},
		{
			"cell_type": "code",
			"execution_count": null,
			"metadata": {
				"tags": [],
				"trusted": true
			},
			"outputs": [],
			"source": [
				"tmp_table_name = f\"{REDSHIFT_TABLE_NAME}_tmp\"\n",
				"\n",
				"post_actions = f\"BEGIN; CREATE TABLE IF NOT EXISTS {REDSHIFT_SCHEMA}.{REDSHIFT_TABLE_NAME} (LIKE {REDSHIFT_SCHEMA}.{tmp_table_name}); \"\n",
				"post_actions += f\"MERGE INTO {REDSHIFT_SCHEMA}.{REDSHIFT_TABLE_NAME} USING {REDSHIFT_SCHEMA}.{tmp_table_name} ON \"\n",
				"\n",
				"post_actions += ' AND '.join(f\"{REDSHIFT_SCHEMA}.{REDSHIFT_TABLE_NAME}.{pk} = {REDSHIFT_SCHEMA}.{tmp_table_name}.{pk}\" for pk in REDSHIFT_TABLE_PRIMARY_KEYS)\n",
				"\n",
				"post_actions += \" WHEN MATCHED THEN UPDATE SET \"\n",
				"post_actions += ', '.join(f\"{col} = {REDSHIFT_SCHEMA}.{tmp_table_name}.{col}\" for col in column_names)\n",
				"\n",
				"post_actions += \" WHEN NOT MATCHED THEN INSERT VALUES (\"\n",
				"post_actions += ', '.join(f\"{REDSHIFT_SCHEMA}.{tmp_table_name}.{col}\" for col in column_names)\n",
				"\n",
				"post_actions += f\"); DROP TABLE {REDSHIFT_SCHEMA}.{tmp_table_name}; END;\"\n",
				"\n",
				"print(f\"post_actions: {post_actions}\")"
			]
		},
		{
			"cell_type": "code",
			"execution_count": null,
			"metadata": {
				"tags": [],
				"trusted": true
			},
			"outputs": [],
			"source": [
				"df.write \\\n",
				"  .format(\"io.github.spark_redshift_community.spark.redshift\") \\\n",
				"  .option(\"url\", jdbc_conf[\"fullUrl\"]) \\\n",
				"  .option(\"user\", jdbc_conf[\"user\"]) \\\n",
				"  .option(\"password\", jdbc_conf[\"password\"]) \\\n",
				"  .option(\"dbtable\", tmp_table_name) \\\n",
				"  .option(\"postactions\", post_actions) \\\n",
				"  .option(\"tempdir\", temp_dir) \\\n",
				"  .option(\"aws_iam_role\", REDSHIFT_IAM_ROLE_ARN) \\\n",
				"  .mode(\"error\") \\\n",
				"  .save()"
			]
		},
		{
			"cell_type": "markdown",
			"metadata": {},
			"source": [
				"#### Update the last query end time"
			]
		},
		{
			"cell_type": "code",
			"execution_count": null,
			"metadata": {
				"tags": [],
				"trusted": true
			},
			"outputs": [],
			"source": [
				"tag = {\"lastQueryEndTime\": endTime}\n",
				"\n",
				"try:\n",
				"    glue.tag_resource(ResourceArn=f\"arn:aws:glue:{REGION}:{AWS_ACCOUNT_ID}:job/{job_name}\",TagsToAdd=tag)\n",
				"except Exception as e:\n",
				"    raise Exception(\"Failed to update lastQueryEndTime tag via tags_resource: \" + e.__str__())"
			]
		},
		{
			"cell_type": "code",
			"execution_count": null,
			"metadata": {},
			"outputs": [],
			"source": []
		}
	],
	"metadata": {
		"kernelspec": {
			"display_name": "Glue PySpark",
			"language": "python",
			"name": "glue_pyspark"
		},
		"language_info": {
			"codemirror_mode": {
				"name": "python",
				"version": 3
			},
			"file_extension": ".py",
			"mimetype": "text/x-python",
			"name": "Python_Glue_Session",
			"pygments_lexer": "python3"
		}
	},
	"nbformat": 4,
	"nbformat_minor": 4
}
